package com.dmp.storm.bolt.kafka;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

import java.util.Random;

public class KafkaPartitioner implements Partitioner {

	public KafkaPartitioner(VerifiableProperties props) {
	}

	@Override
	public int partition(Object key, int numPartitions) {
		int partition = 0;
		try {
			String k = (String) key;
			partition = Math.abs(k.hashCode()) % numPartitions;
		} catch (Exception e) {
			Random ro = new Random();
			partition = Math.abs(ro.nextInt(11))%numPartitions;
		}

		return partition;
	}

}
